🖥️☁️AWS S3 Integration with PySpark: Data Cleaning and Storage
AWS S3 Integration with PySpark: Data Cleaning and Storage
Introduction
AWS Simple Storage Service (S3) is a widely used cloud storage solution that integrates seamlessly with Apache Spark. In this blog, we'll demonstrate how to connect PySpark with AWS S3, clean data using regex, and store processed data in an RDS database.
1. Setting Up AWS Credentials
Before accessing AWS S3, you need to authenticate using your Access Key and Secret Key. Ensure that sensitive credentials are securely stored and not hardcoded.
import urllib
ACCESS_KEY = "your-access-key"
SECRET_KEY = "your-secret-key"
ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "your-bucket-name"
MOUNT_NAME = "s3data"
2. Mounting AWS S3 to Databricks File System (DBFS)
Mounting S3 to Databricks allows seamless access to stored files.
dbutils.fs.mount(
"s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME),
"/mnt/%s" % MOUNT_NAME
)
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))
Output:
Path
/mnt/s3data/sample-data.csv
/mnt/s3data/logs/
3. Loading Data from S3
Once mounted, we can load CSV files into a PySpark DataFrame.
data_path = "/mnt/s3data/sample-data.csv"
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(data_path)
df.show()
Output:
+-----+------------+----------------+
| ID | Name | Email |
+-----+------------+----------------+
| 101 | John Doe | john@example.com |
| 102 | Jane Doe | jane@example.com |
+-----+------------+----------------+
4. Data Cleaning Using Regular Expressions
4.1 Removing Special Characters from Column Names
Column names may contain unwanted special characters that need to be cleaned.
import re
from pyspark.sql.functions import *
cols = [re.sub("[^a-zA-Z0-9]", "", c) for c in df.columns]
df = df.toDF(*cols)
4.2 Cleaning Email Data
Let's remove @
and .
from email addresses.
df = df.withColumn("Email", regexp_replace(col("Email"), "[@.]", ""))
df.show()
Output:
+-----+------------+--------------+
| ID | Name | Email |
+-----+------------+--------------+
| 101 | John Doe | johnexamplecom |
| 102 | Jane Doe | janeexamplecom |
+-----+------------+--------------+
5. Storing Cleaned Data in RDS Database
5.1 Configuring Database Connection
host = "jdbc:mysql://your-rds-instance.amazonaws.com:3306/yourdb"
username = "admin"
password = "yourpassword"
dbtable = "cleaned_data"
5.2 Writing Data to MySQL
df.write.mode("append").format("jdbc").option("url", host)\
.option("user", username)\
.option("password", password)\
.option("dbtable", dbtable)\
.save()
Output:
Data successfully written to RDS table: cleaned_data
6. Batch Processing Multiple Files from S3
We can loop through files in S3, clean them, and store them in an RDS database.
data_folder = "/mnt/s3data/csvdata"
files = dbutils.fs.ls(data_folder)
file_paths = [f.path for f in files if "us-500" in f.path and f.path.endswith(".csv")]
for p in file_paths:
filename = os.path.splitext(os.path.basename(p))[0]
tablename = re.sub(r'[^a-zA-Z0-9]', '', filename)
print(f"Importing {p} into table {tablename}")
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(p)
df.write.mode("overwrite").format("jdbc").option("url", host)\
.option("user", username)\
.option("password", password)\
.option("dbtable", tablename)\
.save()
Output:
Importing /mnt/s3data/csvdata/us-500.csv into table us500
Data successfully written to RDS table: us500
Conclusion
In this blog, we explored how to:
- Mount an AWS S3 bucket to Databricks.
- Read data from S3 into PySpark.
- Clean data using regex operations.
- Store processed data into an RDS database.
This approach helps in efficient data processing and ensures that cleaned data is stored securely for further analysis.
Comments
Post a Comment